package org.infinispan.replication;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.atomic.AtomicMapLookup;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.remoting.ReplicationQueue;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "replication.ReplicationQueueTest")
/* loaded from: input_file:org/infinispan/replication/ReplicationQueueTest.class */
public class ReplicationQueueTest extends MultipleCacheManagersTest {
    private static final int REPL_QUEUE_INTERVAL = 1000;
    private static final int REPL_QUEUE_MAX_ELEMENTS = 10;
    private Cache<Object, Object> cache1;
    private Cache<Object, Object> cache2;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        registerCacheManager(TestCacheManagerFactory.createClusteredCacheManager(createGlobalConfigurationBuilder(), new ConfigurationBuilder()), TestCacheManagerFactory.createClusteredCacheManager(createGlobalConfigurationBuilder(), new ConfigurationBuilder()));
        manager(0).defineConfiguration("replQueue", createCacheConfig(true));
        manager(1).defineConfiguration("replQueue", createCacheConfig(false));
        this.cache1 = cache(0, "replQueue");
        this.cache2 = cache(1, "replQueue");
    }

    private GlobalConfigurationBuilder createGlobalConfigurationBuilder() {
        return GlobalConfigurationBuilder.defaultClusteredBuilder();
    }

    private Configuration createCacheConfig(boolean z) {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_ASYNC, true);
        defaultClusteredCacheConfig.clustering().async().useReplQueue(z).replQueueInterval(1000L).replQueueMaxElements(10);
        return defaultClusteredCacheConfig.build();
    }

    public void testReplicationBasedOnTime() throws Exception {
        this.cache1.put("key", "value");
        ReplicationQueue replicationQueue = (ReplicationQueue) TestingUtil.extractComponent(this.cache1, ReplicationQueue.class);
        AssertJUnit.assertNotNull(replicationQueue);
        AssertJUnit.assertEquals(1, replicationQueue.getElementsCount());
        AssertJUnit.assertNull(this.cache2.get("key"));
        AssertJUnit.assertEquals("value", this.cache1.get("key"));
        replicationQueue.flush();
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.replication.ReplicationQueueTest.1
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return ReplicationQueueTest.this.cache2.get("key") != null;
            }
        });
        AssertJUnit.assertEquals(this.cache2.get("key"), "value");
        AssertJUnit.assertEquals(0, replicationQueue.getElementsCount());
    }

    public void testReplicationBasedOnTimeWithTx() throws Exception {
        TransactionManager transactionManager = TestingUtil.getTransactionManager(this.cache1);
        transactionManager.begin();
        this.cache1.put("key", "value");
        transactionManager.commit();
        ReplicationQueue replicationQueue = (ReplicationQueue) TestingUtil.extractComponent(this.cache1, ReplicationQueue.class);
        AssertJUnit.assertNotNull(replicationQueue);
        AssertJUnit.assertEquals(replicationQueue.getElementsCount(), 1);
        AssertJUnit.assertNull(this.cache2.get("key"));
        AssertJUnit.assertEquals(this.cache1.get("key"), "value");
        replicationQueue.flush();
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.replication.ReplicationQueueTest.2
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return ReplicationQueueTest.this.cache2.get("key") != null;
            }
        });
        AssertJUnit.assertEquals(this.cache2.get("key"), "value");
        AssertJUnit.assertEquals(0, replicationQueue.getElementsCount());
    }

    public void testReplicationBasedOnSize() throws Exception {
        for (int i = 0; i < 10; i++) {
            this.cache1.put("key" + i, "value" + i);
        }
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.replication.ReplicationQueueTest.3
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return ReplicationQueueTest.this.cache2.size() == 10;
            }
        });
        for (int i2 = 0; i2 < 10; i2++) {
            AssertJUnit.assertEquals("value" + i2, this.cache2.get("key" + i2));
        }
    }

    public void testReplicationBasedOnSizeWithTx() throws Exception {
        TransactionManager transactionManager = TestingUtil.getTransactionManager(this.cache1);
        for (int i = 0; i < 10; i++) {
            transactionManager.begin();
            this.cache1.put("key" + i, "value" + i);
            transactionManager.commit();
        }
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.replication.ReplicationQueueTest.4
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return ReplicationQueueTest.this.cache2.size() == 10;
            }
        });
        for (int i2 = 0; i2 < 10; i2++) {
            AssertJUnit.assertEquals("value" + i2, this.cache2.get("key" + i2));
        }
    }

    public void testReplicationQueueMultipleThreads() throws Exception {
        runConcurrently(new Callable() { // from class: org.infinispan.replication.ReplicationQueueTest.5
            AtomicInteger indexOffset = new AtomicInteger();

            @Override // java.util.concurrent.Callable
            public Void call() {
                int andIncrement = this.indexOffset.getAndIncrement();
                for (int i = 0; i < 3; i++) {
                    ReplicationQueueTest.this.cache1.put("key" + andIncrement + "_" + i, "value");
                }
                return null;
            }
        }, 4);
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.replication.ReplicationQueueTest.6
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return ReplicationQueueTest.this.cache2.size() == 12;
            }
        });
        AssertJUnit.assertEquals(0, ((ReplicationQueue) TestingUtil.extractComponent(this.cache1, ReplicationQueue.class)).getElementsCount());
    }

    public void testAtomicHashMap() throws Exception {
        TransactionManager transactionManager = TestingUtil.getTransactionManager(this.cache1);
        transactionManager.begin();
        AtomicMapLookup.getAtomicMap(this.cache1, "foo").put("sub-key", "sub-value");
        transactionManager.commit();
        ((ReplicationQueue) TestingUtil.extractComponent(this.cache1, ReplicationQueue.class)).flush();
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 5000 && this.cache2.get("foo") == null) {
            Thread.sleep(50L);
        }
        AssertJUnit.assertNotNull(AtomicMapLookup.getAtomicMap(this.cache2, "foo", false));
        AssertJUnit.assertNotNull(AtomicMapLookup.getAtomicMap(this.cache2, "foo").get("sub-key"));
        AssertJUnit.assertEquals("sub-value", AtomicMapLookup.getAtomicMap(this.cache2, "foo").get("sub-key"));
    }
}
